home *** CD-ROM | disk | FTP | other *** search
/ Complete Linux / Complete Linux.iso / docs / apps / database / postgres / postgre4.z / postgre4 / src / access / nbtree / nbtinsert.c < prev    next >
Encoding:
C/C++ Source or Header  |  1992-08-27  |  20.6 KB  |  705 lines

  1. /*
  2.  *  btinsert.c -- Item insertion in Lehman and Yao btrees for Postgres.
  3.  */
  4.  
  5. #include "tmp/c.h"
  6. #include "tmp/postgres.h"
  7.  
  8. #include "storage/bufmgr.h"
  9. #include "storage/bufpage.h"
  10. #include "storage/page.h"
  11.  
  12. #include "utils/log.h"
  13. #include "utils/rel.h"
  14. #include "utils/excid.h"
  15.  
  16. #include "access/heapam.h"
  17. #include "access/genam.h"
  18. #include "access/ftup.h"
  19. #include "access/nbtree.h"
  20.  
  21. RcsId("$Header: /private/postgres/src/access/nbtree/RCS/nbtinsert.c,v 1.11 1991/11/08 15:41:04 kemnitz Exp $");
  22.  
  23. /*
  24.  *  _bt_doinsert() -- Handle insertion of a single btitem in the tree.
  25.  *
  26.  *    This routine is called by the public interface routines, btbuild
  27.  *    and btinsert.  By here, btitem is filled in, and has a unique
  28.  *    (xid, seqno) pair.
  29.  */
  30.  
  31. InsertIndexResult
  32. _bt_doinsert(rel, btitem)
  33.     Relation rel;
  34.     BTItem btitem;
  35. {
  36.     ScanKey itup_scankey;
  37.     IndexTuple itup;
  38.     BTStack stack;
  39.     Buffer buf;
  40.     BlockNumber blkno;
  41.     OffsetIndex itup_off;
  42.     int natts;
  43.     InsertIndexResult res;
  44.  
  45.     itup = &(btitem->bti_itup);
  46.  
  47.     /* we need a scan key to do our search, so build one */
  48.     itup_scankey = _bt_mkscankey(rel, itup);
  49.     natts = rel->rd_rel->relnatts;
  50.  
  51.     /* find the page containing this key */
  52.     stack = _bt_search(rel, natts, itup_scankey, &buf);
  53.     blkno = BufferGetBlockNumber(buf);
  54.  
  55.     /* trade in our read lock for a write lock */
  56.     _bt_relbuf(rel, buf, BT_READ);
  57.     buf = _bt_getbuf(rel, blkno, BT_WRITE);
  58.  
  59.     /*
  60.      *  If the page was split between the time that we surrendered our
  61.      *  read lock and acquired our write lock, then this page may no
  62.      *  longer be the right place for the key we want to insert.  In this
  63.      *  case, we need to move right in the tree.  See Lehman and Yao for
  64.      *  an excruciatingly precise description.
  65.      */
  66.  
  67.     buf = _bt_moveright(rel, buf, natts, itup_scankey, BT_WRITE);
  68.  
  69.     /* do the insertion */
  70.     res = _bt_insertonpg(rel, buf, stack, natts, itup_scankey,
  71.              btitem, (BTItem) NULL);
  72.  
  73.     /* be tidy */
  74.     _bt_freestack(stack);
  75.     _bt_freeskey(itup_scankey);
  76.  
  77.     return (res);
  78. }
  79.  
  80. /*
  81.  *  _bt_insertonpg() -- Insert a tuple on a particular page in the index.
  82.  *
  83.  *    This recursive procedure does the following things:
  84.  *
  85.  *        +  if necessary, splits the target page.
  86.  *        +  finds the right place to insert the tuple (taking into
  87.  *           account any changes induced by a split).
  88.  *        +  inserts the tuple.
  89.  *        +  if the page was split, pops the parent stack, and finds the
  90.  *           right place to insert the new child pointer (by walking
  91.  *           right using information stored in the parent stack).
  92.  *        +  invoking itself with the appropriate tuple for the right
  93.  *           child page on the parent.
  94.  *
  95.  *    On entry, we must have the right buffer on which to do the
  96.  *    insertion, and the buffer must be pinned and locked.  On return,
  97.  *    we will have dropped both the pin and the write lock on the buffer.
  98.  *
  99.  *    The locking interactions in this code are critical.  You should
  100.  *    grok Lehman and Yao's paper before making any changes.  In addition,
  101.  *    you need to understand how we disambiguate duplicate keys in this
  102.  *    implementation, in order to be able to find our location using
  103.  *    L&Y "move right" operations.  Since we may insert duplicate user
  104.  *    keys, and since these dups may propogate up the tree, we use the
  105.  *    'afteritem' parameter to position ourselves correctly for the
  106.  *    insertion on internal pages.
  107.  */
  108.  
  109. InsertIndexResult
  110. _bt_insertonpg(rel, buf, stack, keysz, scankey, btitem, afteritem)
  111.     Relation rel;
  112.     Buffer buf;
  113.     BTStack stack;
  114.     int keysz;
  115.     ScanKey scankey;
  116.     BTItem btitem;
  117.     BTItem afteritem;
  118. {
  119.     InsertIndexResult res;
  120.     Page page;
  121.     Buffer rbuf;
  122.     Buffer pbuf;
  123.     Page rpage;
  124.     ScanKey newskey;
  125.     BTItem ritem;
  126.     BTPageOpaque rpageop;
  127.     BlockNumber rbknum, itup_blkno;
  128.     OffsetIndex itup_off;
  129.     int itemsz;
  130.     int ritemsz;
  131.     ItemId hikey;
  132.     InsertIndexResult newres;
  133.     BTItem new_item = (BTItem) NULL;
  134.  
  135.     page = BufferGetPage(buf, 0);
  136.     itemsz = IndexTupleDSize(btitem->bti_itup)
  137.        + (sizeof(BTItemData) - sizeof(IndexTupleData));
  138.  
  139.     if (PageGetFreeSpace(page) < itemsz) {
  140.  
  141.     /* split the buffer into left and right halves */
  142.     rbuf = _bt_split(rel, buf);
  143.  
  144.     /* which new page (left half or right half) gets the tuple? */
  145.     if (_bt_goesonpg(rel, buf, keysz, scankey, afteritem)) {
  146.         itup_off = _bt_pgaddtup(rel, buf, keysz, scankey,
  147.                     itemsz, btitem, afteritem);
  148.         itup_blkno = BufferGetBlockNumber(buf);
  149.     } else {
  150.         itup_off = _bt_pgaddtup(rel, rbuf, keysz, scankey,
  151.                     itemsz, btitem, afteritem);
  152.         itup_blkno = BufferGetBlockNumber(rbuf);
  153.     }
  154.  
  155.     /*
  156.      *  By here,
  157.      *
  158.      *    +  our target page has been split;
  159.      *    +  the original tuple has been inserted;
  160.      *    +  we have write locks on both the old (left half) and new
  161.      *       (right half) buffers, after the split; and
  162.      *    +  we have the key we want to insert into the parent.
  163.      *
  164.      *  Do the parent insertion.  We need to hold onto the locks for
  165.      *  the child pages until we locate the parent, but we can release
  166.      *  them before doing the actual insertion (see Lehman and Yao for
  167.      *  the reasoning).
  168.      */
  169.  
  170.     if (stack == (BTStack) NULL) {
  171.  
  172.         /* create a new root node and release the split buffers */
  173.         _bt_newroot(rel, buf, rbuf);
  174.         _bt_relbuf(rel, buf, BT_WRITE);
  175.         _bt_relbuf(rel, rbuf, BT_WRITE);
  176.  
  177.     } else {
  178.  
  179.         /* form a index tuple that points at the new right page */
  180.         rbknum = BufferGetBlockNumber(rbuf);
  181.         rpage = BufferGetPage(rbuf, 0);
  182.         rpageop = (BTPageOpaque) PageGetSpecialPointer(rpage);
  183.  
  184.         /*
  185.          *  By convention, the first entry (0) on every non-leftmost page
  186.          *  is the high key for that page.  In order to get the lowest key
  187.          *  on the new right page, we actually look at its second (1) entry.
  188.          */
  189.  
  190.         if (rpageop->btpo_next != P_NONE)
  191.         ritem = (BTItem) PageGetItem(rpage, PageGetItemId(rpage, 1));
  192.         else
  193.         ritem = (BTItem) PageGetItem(rpage, PageGetItemId(rpage, 0));
  194.  
  195.         /* get a unique btitem for this key */
  196.         new_item = _bt_formitem(&(ritem->bti_itup));
  197.  
  198.         ItemPointerSet(&(new_item->bti_itup.t_tid), 0, rbknum, 0, 1);
  199.  
  200.         /* find the parent buffer */
  201.         pbuf = _bt_getstackbuf(rel, stack, BT_WRITE);
  202.  
  203.         /* don't need the children anymore */
  204.         _bt_relbuf(rel, buf, BT_WRITE);
  205.         _bt_relbuf(rel, rbuf, BT_WRITE);
  206.  
  207.         newskey = _bt_mkscankey(rel, &(new_item->bti_itup));
  208.         newres = _bt_insertonpg(rel, pbuf, stack->bts_parent,
  209.                     keysz, newskey, new_item,
  210.                     stack->bts_btitem);
  211.  
  212.         /* be tidy */
  213.         pfree(newres);
  214.         pfree(newskey);
  215.         pfree(new_item);
  216.     }
  217.     } else {
  218.     itup_off = _bt_pgaddtup(rel, buf, keysz, scankey,
  219.                 itemsz, btitem, afteritem);
  220.     itup_blkno = BufferGetBlockNumber(buf);
  221.  
  222.     _bt_relbuf(rel, buf, BT_WRITE);
  223.     }
  224.  
  225.     /* by here, the new tuple is inserted */
  226.     res = (InsertIndexResult) palloc(sizeof(InsertIndexResultData));
  227.     ItemPointerSet(&(res->pointerData), 0, itup_blkno, 0, itup_off);
  228.     res->lock = (RuleLock) NULL;
  229.     res->offset = (double) 0;
  230.  
  231.     return (res);
  232. }
  233.  
  234. /*
  235.  *  _bt_split() -- split a page in the btree.
  236.  *
  237.  *    On entry, buf is the page to split, and is write-locked and pinned.
  238.  *    Returns the new right sibling of buf, pinned and write-locked.  The
  239.  *    pin and lock on buf are maintained.
  240.  */
  241.  
  242. Buffer
  243. _bt_split(rel, buf)
  244.     Relation rel;
  245.     Buffer buf;
  246. {
  247.     Buffer rbuf;
  248.     Page origpage;
  249.     Page leftpage, rightpage;
  250.     BlockNumber lbkno, rbkno;
  251.     BTPageOpaque ropaque, lopaque, oopaque;
  252.     Buffer sbuf;
  253.     Page spage;
  254.     BTPageOpaque sopaque;
  255.     Size itemsz;
  256.     ItemId itemid;
  257.     BTItem item;
  258.     int nleft, nright;
  259.     OffsetIndex start;
  260.     OffsetIndex maxoff;
  261.     OffsetIndex firstright;
  262.     OffsetIndex i;
  263.     Size llimit;
  264.  
  265.     rbuf = _bt_getbuf(rel, P_NEW, BT_WRITE);
  266.     origpage = BufferGetPage(buf, 0);
  267.     leftpage = PageGetTempPage(origpage, sizeof(BTPageOpaqueData));
  268.     rightpage = BufferGetPage(rbuf, 0);
  269.  
  270.     _bt_pageinit(rightpage);
  271.     _bt_pageinit(leftpage);
  272.  
  273.     /* init btree private data */
  274.     oopaque = (BTPageOpaque) PageGetSpecialPointer(origpage);
  275.     lopaque = (BTPageOpaque) PageGetSpecialPointer(leftpage);
  276.     ropaque = (BTPageOpaque) PageGetSpecialPointer(rightpage);
  277.  
  278.     /* if we're splitting this page, it won't be the root when we're done */
  279.     oopaque->btpo_flags &= ~BTP_ROOT;
  280.     lopaque->btpo_flags = ropaque->btpo_flags = oopaque->btpo_flags;
  281.     lopaque->btpo_prev = oopaque->btpo_prev;
  282.     ropaque->btpo_prev = BufferGetBlockNumber(buf);
  283.     lopaque->btpo_next = BufferGetBlockNumber(rbuf);
  284.     ropaque->btpo_next = oopaque->btpo_next;
  285.  
  286.     /*
  287.      *  If the page we're splitting is not the rightmost page at its level
  288.      *  in the tree, then the first (0) entry on the page is the high key
  289.      *  for the page.  We need to copy that to the right half.  Otherwise,
  290.      *  we should treat the line pointers beginning at zero as user data.
  291.      *
  292.      *  We leave a blank space at the start of the line table for the left
  293.      *  page.  We'll come back later and fill it in with the high key item
  294.      *  we get from the right key.
  295.      */
  296.  
  297.     nleft = 2;
  298.     nright = 1;
  299.     if ((ropaque->btpo_next = oopaque->btpo_next) != P_NONE) {
  300.     start = 1;
  301.     itemid = PageGetItemId(origpage, 0);
  302.     itemsz = ItemIdGetLength(itemid);
  303.     item = (BTItem) PageGetItem(origpage, itemid);
  304.     PageAddItem(rightpage, (Item) item, itemsz, nright++, LP_USED);
  305.     } else {
  306.     start = 0;
  307.     }
  308.     maxoff = PageGetMaxOffsetIndex(origpage);
  309.     llimit = PageGetFreeSpace(leftpage) / 2;
  310.     firstright = _bt_findsplitloc(rel, origpage, start, maxoff, llimit);
  311.  
  312.     for (i = start; i <= maxoff; i++) {
  313.     itemid = PageGetItemId(origpage, i);
  314.     itemsz = ItemIdGetLength(itemid);
  315.     item = (BTItem) PageGetItem(origpage, itemid);
  316.  
  317.     /* decide which page to put it on */
  318.     if (i < firstright)
  319.         PageAddItem(leftpage, (Item) item, itemsz, nleft++, LP_USED);
  320.     else
  321.         PageAddItem(rightpage, (Item) item, itemsz, nright++, LP_USED);
  322.     }
  323.  
  324.     /*
  325.      *  Okay, page has been split, high key on right page is correct.  Now
  326.      *  set the high key on the left page to be the min key on the right
  327.      *  page.
  328.      */
  329.  
  330.     if (ropaque->btpo_next == P_NONE)
  331.     itemid = PageGetItemId(rightpage, 0);
  332.     else
  333.     itemid = PageGetItemId(rightpage, 1);
  334.     itemsz = ItemIdGetLength(itemid);
  335.     item = (BTItem) PageGetItem(rightpage, itemid);
  336.  
  337.     /*
  338.      *  We left a hole for the high key on the left page; fill it.  The
  339.      *  modal crap is to tell the page manager to put the new item on the
  340.      *  page and not screw around with anything else.  Whoever designed
  341.      *  this interface has presumably crawled back into the dung heap they
  342.      *  came from.  No one here will admit to it.
  343.      */
  344.  
  345.     PageManagerModeSet(OverwritePageManagerMode);
  346.     PageAddItem(leftpage, (Item) item, itemsz, 1, LP_USED);
  347.     PageManagerModeSet(ShufflePageManagerMode);
  348.  
  349.     /*
  350.      *  By here, the original data page has been split into two new halves,
  351.      *  and these are correct.  The algorithm requires that the left page
  352.      *  never move during a split, so we copy the new left page back on top
  353.      *  of the original.  Note that this is not a waste of time, since we
  354.      *  also require (in the page management code) that the center of a
  355.      *  page always be clean, and the most efficient way to guarantee this
  356.      *  is just to compact the data by reinserting it into a new left page.
  357.      */
  358.  
  359.     PageRestoreTempPage(leftpage, origpage);
  360.  
  361.     /* write these guys out */
  362.     _bt_wrtnorelbuf(rel, rbuf);
  363.     _bt_wrtnorelbuf(rel, buf);
  364.  
  365.     /*
  366.      *  Finally, we need to grab the right sibling (if any) and fix the
  367.      *  prev pointer there.  We are guaranteed that this is deadlock-free
  368.      *  since no other writer will be moving holding a lock on that page
  369.      *  and trying to move left, and all readers release locks on a page
  370.      *  before trying to fetch its neighbors.
  371.      */
  372.  
  373.     if (ropaque->btpo_next != P_NONE) {
  374.     sbuf = _bt_getbuf(rel, ropaque->btpo_next, BT_WRITE);
  375.     spage = BufferGetPage(sbuf, 0);
  376.     sopaque = (BTPageOpaque) PageGetSpecialPointer(spage);
  377.     sopaque->btpo_prev = BufferGetBlockNumber(rbuf);
  378.  
  379.     /* write and release the old right sibling */
  380.     _bt_wrtbuf(rel, sbuf);
  381.     }
  382.  
  383.     /* split's done */
  384.     return (rbuf);
  385. }
  386.  
  387. /*
  388.  *  _bt_findsplitloc() -- find a safe place to split a page.
  389.  *
  390.  *    In order to guarantee the proper handling of searches for duplicate
  391.  *    keys, the first duplicate in the chain must either be the first
  392.  *    item on the page after the split, or the entire chain must be on
  393.  *    one of the two pages.  That is,
  394.  *        [1 2 2 2 3 4 5]
  395.  *    must become
  396.  *        [1] [2 2 2 3 4 5]
  397.  *    or
  398.  *        [1 2 2 2] [3 4 5]
  399.  *    but not
  400.  *        [1 2 2] [2 3 4 5].
  401.  *    However,
  402.  *        [2 2 2 2 2 3 4]
  403.  *    may be split as
  404.  *        [2 2 2 2] [2 3 4].
  405.  */
  406.  
  407. _bt_findsplitloc(rel, page, start, maxoff, llimit)
  408.     Relation rel;
  409.     Page page;
  410.     OffsetIndex start;
  411.     OffsetIndex maxoff;
  412.     Size llimit;
  413. {
  414.     OffsetIndex i;
  415.     OffsetIndex saferight;
  416.     ItemId nxtitemid, safeitemid;
  417.     BTItem safeitem, nxtitem;
  418.     IndexTuple safetup, nxttup;
  419.     Size nbytes;
  420.     TupleDescriptor itupdesc;
  421.     int natts;
  422.     int attno;
  423.     Datum attsafe;
  424.     Datum attnext;
  425.     Boolean null;
  426.  
  427.     itupdesc = RelationGetTupleDescriptor(rel);
  428.     natts = rel->rd_rel->relnatts;
  429.  
  430.     saferight = start;
  431.     safeitemid = PageGetItemId(page, saferight);
  432.     nbytes = ItemIdGetLength(safeitemid) + sizeof(ItemIdData);
  433.     safeitem = (BTItem) PageGetItem(page, safeitemid);
  434.     safetup = &(safeitem->bti_itup);
  435.  
  436.     i = start + 1;
  437.  
  438.     while (nbytes < llimit) {
  439.  
  440.     /* check the next item on the page */
  441.     nxtitemid = PageGetItemId(page, i);
  442.     nbytes += (ItemIdGetLength(nxtitemid) + sizeof(ItemIdData));
  443.     nxtitem = (BTItem) PageGetItem(page, nxtitemid);
  444.     nxttup = &(nxtitem->bti_itup);
  445.  
  446.     /* test against last known safe item */
  447.     for (attno = 1; attno <= natts; attno++) {
  448.         attsafe = (Datum) IndexTupleGetAttributeValue(safetup, attno,
  449.                               itupdesc, &null);
  450.         attnext = (Datum) IndexTupleGetAttributeValue(nxttup, attno,
  451.                               itupdesc, &null);
  452.  
  453.         /*
  454.          *  If the tuple we're looking at isn't equal to the last safe one
  455.          *  we saw, then it's our new safe tuple.
  456.          */
  457.  
  458.         if (!_bt_invokestrat(rel, attno, BTEqualStrategyNumber,
  459.                  attsafe, attnext)) {
  460.         safetup = nxttup;
  461.         saferight = i;
  462.  
  463.         /* break is for the attno for loop */
  464.         break;
  465.         }
  466.     }
  467.     i++;
  468.     }
  469.  
  470.     /*
  471.      *  If the chain of dups starts at the beginning of the page and extends
  472.      *  past the halfway mark, we can split it in the middle.
  473.      */
  474.  
  475.     if (saferight == start)
  476.     saferight = i;
  477.  
  478.     return (saferight);
  479. }
  480.  
  481. /*
  482.  *  _bt_newroot() -- Create a new root page for the index.
  483.  *
  484.  *    We've just split the old root page and need to create a new one.
  485.  *    In order to do this, we add a new root page to the file, then lock
  486.  *    the metadata page and update it.  This is guaranteed to be deadlock-
  487.  *    free, because all readers release their locks on the metadata page
  488.  *    before trying to lock the root, and all writers lock the root before
  489.  *    trying to lock the metadata page.  We have a write lock on the old
  490.  *    root page, so we have not introduced any cycles into the waits-for
  491.  *    graph.
  492.  *
  493.  *    On entry, lbuf (the old root) and rbuf (its new peer) are write-
  494.  *    locked.  We don't drop the locks in this routine; that's done by
  495.  *    the caller.  On exit, a new root page exists with entries for the
  496.  *    two new children.  The new root page is neither pinned nor locked.
  497.  */
  498.  
  499. _bt_newroot(rel, lbuf, rbuf)
  500.     Relation rel;
  501.     Buffer lbuf;
  502.     Buffer rbuf;
  503. {
  504.     Buffer rootbuf;
  505.     Page lpage, rpage, rootpage;
  506.     BlockNumber lbkno, rbkno;
  507.     BlockNumber rootbknum;
  508.     BTPageOpaque rootopaque;
  509.     ItemId itemid;
  510.     BTItem item;
  511.     Size itemsz;
  512.     BTItem new_item;
  513.  
  514.     /* get a new root page */
  515.     rootbuf = _bt_getbuf(rel, P_NEW, BT_WRITE);
  516.     rootpage = BufferGetPage(rootbuf, 0);
  517.     _bt_pageinit(rootpage);
  518.  
  519.     /* set btree special data */
  520.     rootopaque = (BTPageOpaque) PageGetSpecialPointer(rootpage);
  521.     rootopaque->btpo_prev = rootopaque->btpo_next = P_NONE;
  522.     rootopaque->btpo_flags |= BTP_ROOT;
  523.  
  524.     /*
  525.      *  Insert the internal tuple pointers.
  526.      */
  527.  
  528.     lbkno = BufferGetBlockNumber(lbuf);
  529.     rbkno = BufferGetBlockNumber(rbuf);
  530.     lpage = BufferGetPage(lbuf, 0);
  531.     rpage = BufferGetPage(rbuf, 0);
  532.  
  533.     /* step over the high key on the left page */
  534.     itemid = PageGetItemId(lpage, 1);
  535.     itemsz = ItemIdGetLength(itemid);
  536.     item = (BTItem) PageGetItem(lpage, itemid);
  537.     new_item = _bt_formitem(&(item->bti_itup));
  538.     ItemPointerSet(&(new_item->bti_itup.t_tid), 0, lbkno, 0, 1);
  539.  
  540.     /* insert the left page pointer */
  541.     PageAddItem(rootpage, (Item) new_item, itemsz, 1, LP_USED);
  542.     pfree(new_item);
  543.  
  544.     itemid = PageGetItemId(rpage, 0);
  545.     itemsz = ItemIdGetLength(itemid);
  546.     item = (BTItem) PageGetItem(rpage, itemid);
  547.     new_item = _bt_formitem(&(item->bti_itup));
  548.     ItemPointerSet(&(new_item->bti_itup.t_tid), 0, rbkno, 0, 1);
  549.  
  550.     /* insert the right page pointer */
  551.     PageAddItem(rootpage, (Item) new_item, itemsz, 2, LP_USED);
  552.     pfree (new_item);
  553.  
  554.     /* write and let go of the root buffer */
  555.     rootbknum = BufferGetBlockNumber(rootbuf);
  556.     _bt_wrtbuf(rel, rootbuf);
  557.  
  558.     /* update metadata page with new root block number */
  559.     _bt_metaproot(rel, rootbknum);
  560. }
  561.  
  562. /*
  563.  *  _bt_pgaddtup() -- add a tuple to a particular page in the index.
  564.  *
  565.  *    This routine adds the tuple to the page as requested, and keeps the
  566.  *    write lock and reference associated with the page's buffer.  It is
  567.  *    an error to call pgaddtup() without a write lock and reference.  If
  568.  *    afteritem is non-null, it's the item that we expect our new item
  569.  *    to follow.  Otherwise, we do a binary search for the correct place
  570.  *    and insert the new item there.
  571.  */
  572.  
  573. OffsetIndex
  574. _bt_pgaddtup(rel, buf, keysz, itup_scankey, itemsize, btitem, afteritem)
  575.     Relation rel;
  576.     Buffer buf;
  577.     int keysz;
  578.     ScanKey itup_scankey;
  579.     Size itemsize;
  580.     BTItem btitem;
  581.     BTItem afteritem;
  582. {
  583.     OffsetIndex itup_off, maxoff;
  584.     OffsetIndex first;
  585.     ItemId itemid;
  586.     BTItem olditem;
  587.     Page page;
  588.     BTPageOpaque opaque;
  589.     BTItem chkitem;
  590.     OID afteroid;
  591.  
  592.     page = BufferGetPage(buf, 0);
  593.     opaque = (BTPageOpaque) PageGetSpecialPointer(page);
  594.     if (opaque->btpo_next == P_NONE)
  595.     first = 0;
  596.     else
  597.     first = 1;
  598.  
  599.     if (afteritem == (BTItem) NULL) {
  600.     itup_off = _bt_binsrch(rel, buf, keysz, itup_scankey, BT_INSERTION) + 1;
  601.     } else {
  602.     afteroid = afteritem->bti_oid;
  603.     itup_off = first;
  604.  
  605.     for (;;) {
  606.         chkitem = (BTItem) PageGetItem(page,
  607.                        PageGetItemId(page, itup_off++));
  608.         if (chkitem->bti_oid == afteroid)
  609.         break;
  610.     }
  611.  
  612.     /* we want to be after the item */
  613.     itup_off++;
  614.     }
  615.  
  616.     PageAddItem(page, (Item) btitem, itemsize, itup_off, LP_USED);
  617.  
  618.     /* write the buffer, but hold our lock */
  619.     _bt_wrtnorelbuf(rel, buf);
  620.  
  621.     return (itup_off);
  622. }
  623.  
  624. /*
  625.  *  _bt_goesonpg() -- Does a new tuple belong on this page?
  626.  *
  627.  *    This is part of the complexity introduced by allowing duplicate
  628.  *    keys into the index.  The tuple belongs on this page if:
  629.  *
  630.  *        + there is no page to the right of this one; or
  631.  *        + it is less than the high key on the page; or
  632.  *        + the item it is to follow ("afteritem") appears on this
  633.  *          page.
  634.  */
  635.  
  636. bool
  637. _bt_goesonpg(rel, buf, keysz, scankey, afteritem)
  638.     Relation rel;
  639.     Buffer buf;
  640.     Size keysz;
  641.     ScanKey scankey;
  642.     BTItem afteritem;
  643. {
  644.     Page page;
  645.     ItemId hikey;
  646.     BTPageOpaque opaque;
  647.     ItemId itemid;
  648.     BTItem chkitem;
  649.     OffsetIndex offind, maxoff;
  650.     OID afteroid;
  651.     bool found;
  652.  
  653.     page = BufferGetPage(buf, 0);
  654.  
  655.     /* no right neighbor? */
  656.     opaque = (BTPageOpaque) PageGetSpecialPointer(page);
  657.     if (opaque->btpo_next == P_NONE)
  658.     return (true);
  659.  
  660.     /* < the high key? */
  661.     hikey = PageGetItemId(page, 0);
  662.     if (_bt_skeycmp(rel, keysz, scankey, page, hikey, BTLessStrategyNumber))
  663.     return (true);
  664.  
  665.     /*
  666.      *  If the scan key is > the high key, then it for sure doesn't belong
  667.      *  here.
  668.      */
  669.  
  670.     if (_bt_skeycmp(rel, keysz, scankey, page, hikey, BTGreaterStrategyNumber))
  671.     return (false);
  672.  
  673.     /*
  674.      *  If we have no adjacency information, and the item is equal to the
  675.      *  high key on the page (by here it is), then the item does not belong
  676.      *  on this page.
  677.      */
  678.  
  679.     if (afteritem == (BTItem) NULL)
  680.     return (false);
  681.  
  682.     /* damn, have to work for it.  i hate that. */
  683.     afteroid = afteritem->bti_oid;
  684.     maxoff = PageGetMaxOffsetIndex(page);
  685.  
  686.     /*
  687.      *  Search the entire page for the afteroid.  We need to do this, rather
  688.      *  than doing a binary search and starting from there, because if the
  689.      *  key we're searching for is the leftmost key in the tree at this
  690.      *  level, then a binary search will do the wrong thing.  Splits are
  691.      *  pretty infrequent, so the cost isn't as bad as it could be.
  692.      */
  693.  
  694.     found = false;
  695.     for (offind = 1; offind <= maxoff; offind++) {
  696.     chkitem = (BTItem) PageGetItem(page, PageGetItemId(page, offind));
  697.     if (chkitem->bti_oid == afteroid) {
  698.         found = true;
  699.         break;
  700.     }
  701.     }
  702.  
  703.     return (found);
  704. }
  705.